Lambda でリソースベースポリシーのプリンシパルを監視してみた
はじめに
こんにちは、さすけです!
以前、名古屋の勉強会 に登壇させていただいた際に、リソースベースポリシーのプリンシパルが自動的に変更されることがあるという話をしました(詳しくは下で説明します)。
その際、質問をいただきましたので検証をおこなってみました。少し長くなりますが、お付き合いください!
プリンシパルが自動で変わる!?
まず、先述しておりました「プリンシパルが自動で変わる」というのは、どういう状況なのでしょうか?
これは「プリンシパル が AIDA あるいは AROA から始まる文字列に置き換わる」という事象です。
AWS の re:Post には下記のような記載がありました。
==== 抜粋 ====
リソースベースのポリシーの一意のプリンシパル ID は、IAM ユーザーまたはロールが削除されたことを示します。AWS が有効な ARN にマッピングし直すことができないため、プリンシパル ID が表示されます。
==== 抜粋 ====
プリンシパルに設定していた IAM ユーザーあるいは IAM ロールが削除されてしまったことが原因のようですね。IAM ユーザーが削除された場合には AIDA
、IAM ロールが削除された場合には AROA
に置き換わります。
「AWS が有効な ARN にマッピングし直すことができないため」ということは、プリンシパルに限らず ARN で指定されている箇所ではこの事象が起こりうるということですね。
実際に置き換わったポリシーは以下のようになります。
"Principal": {
"AWS": [
"AIDAxxxxxxxxxxxxxxxxx",
"AROAxxxxxxxxxxxxxxxxx"
]
}
これをそのままにしておくと、ポリシーの変更を保存できなかったりするので、プリンシパルから削除するか有効な ARN に置き換えることが望ましいと言えます。
いただいた質問
質問の内容は「AIDA や AROA に置き換わったことを検知する方法はありますか?」という内容でした。
検証したことがなかったので、その場ではっきりとした回答ができませんでした(すみません、、)。
結論から申し上げますと 可能です。
ただ、少し手間がかかりますのであらかじめご了承ください。
検証
検知および通知に使用するサービスは、Lambda、EventBridge、SNS です。
概要としては、EventBridge をトリガーとした Lambda を作成し、スケジュール実行(今回の検証では週に 1 回)させるというものになります。
検証用リソースベースポリシーとして、S3 バケットのバケットポリシーを使用しますので、S3 バケットも後々作成します。
SNS トピックの作成
まずは、メール通知用の SNS トピックを作成します。
トピック名は InvalidPrincipalNotification
とします。
SNS トピックの作成手順は下記ブログがわかりやすくまとまっておりますので、ご参照ください。
Lambda 用 IAM ロールの作成
次に、Lambda 用に LambdaResourcePolicyMonitorRole
という名前で IAM ロールを作成します。以下ポリシーを割り当てます。
今回の検証では、S3 のリソースベースポリシーであるバケットポリシーを監視してみるので、s3:ListAllMyBuckets
s3:GetBucketPolicy
を付与しています。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:ListRoles",
"iam:GetRole",
"iam:ListRolePolicies",
"iam:GetRolePolicy"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets",
"s3:GetBucketPolicy"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": "arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:InvalidPrincipalNotification"
}
]
}
実際の運用では、管理を容易にするために AWS マネージドポリシーをアタッチするのが良いかと思いますが、今回の検証ではパッと見のわかりやすさから、1 つのカスタマー管理ポリシーにまとめております。
Lambda 関数の作成
Python の Lambda 関数 を TestDeletePrincipal
という名前で作成します。今回は Python3.13 を使用しております。
関数作成時に、先ほど作成した IAM ロール LambdaResourcePolicyMonitorRole
を選択してください。
Lambda では、AIDA
あるいは AROA
から始まるプリンシパルを検出し、検出されたコンソール URL を添付したメールを送信するようにします。
実際のコードは以下の通りです(少々長いので VScode にコピペするなどするとわかりやすいです)。
また、あらかじめ Lambda の環境変数に先ほど作成した SNS トピック を SNS_TOPIC_ARN
として設定しておきましょう。
import boto3
import json
import os
from botocore.exceptions import ClientError
# AWSクライアントの初期化
s3 = boto3.client('s3')
sns = boto3.client('sns')
sqs = boto3.client('sqs')
lambda_client = boto3.client('lambda')
SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']
BUCKETS_TO_MONITOR = os.environ.get('BUCKETS_TO_MONITOR', '').split(',')
# オプション: リージョンの指定が必要な場合
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-1') # 例: 'ap-northeast-1'
def get_s3_buckets():
try:
response = s3.list_buckets()
return [bucket['Name'] for bucket in response['Buckets']]
except ClientError as e:
print(f"Error listing S3 buckets: {e}")
return []
def get_s3_bucket_policies(buckets):
policies = {}
for bucket in buckets:
try:
policy = s3.get_bucket_policy(Bucket=bucket)
policies[bucket] = json.loads(policy['Policy'])
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NoSuchBucketPolicy':
policies[bucket] = {}
elif error_code == 'NoSuchBucket':
print(f"バケット '{bucket}' が存在しません。")
policies[bucket] = {}
else:
print(f"バケット '{bucket}' のポリシー取得中にエラーが発生しました: {e}")
policies[bucket] = {}
except Exception as e:
print(f"バケット '{bucket}' のポリシー取得中に予期しないエラーが発生しました: {e}")
policies[bucket] = {}
return policies
def get_sns_policies():
policies = {}
try:
response = sns.list_topics()
topics = response.get('Topics', [])
for topic in topics:
topic_arn = topic['TopicArn']
try:
policy = sns.get_topic_attributes(TopicArn=topic_arn)['Attributes'].get('Policy', '{}')
policies[topic_arn] = json.loads(policy)
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NotFound':
policies[topic_arn] = {}
else:
print(f"SNSトピック '{topic_arn}' のポリシー取得中にエラーが発生しました: {e}")
policies[topic_arn] = {}
except Exception as e:
print(f"SNSトピック '{topic_arn}' のポリシー取得中に予期しないエラーが発生しました: {e}")
policies[topic_arn] = {}
except ClientError as e:
print(f"Error listing SNS topics: {e}")
return policies
def get_sqs_policies():
policies = {}
try:
response = sqs.list_queues()
queues = response.get('QueueUrls', [])
for queue_url in queues:
try:
attributes = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['Policy'])['Attributes']
policy = attributes.get('Policy', '{}')
policies[queue_url] = json.loads(policy)
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'AWS.SimpleQueueService.NonExistentQueue':
print(f"キュー '{queue_url}' が存在しません。")
policies[queue_url] = {}
else:
print(f"SQSキュー '{queue_url}' のポリシー取得中にエラーが発生しました: {e}")
policies[queue_url] = {}
except Exception as e:
print(f"SQSキュー '{queue_url}' のポリシー取得中に予期しないエラーが発生しました: {e}")
policies[queue_url] = {}
except ClientError as e:
print(f"Error listing SQS queues: {e}")
return policies
def get_lambda_policies():
policies = {}
try:
response = lambda_client.list_functions()
functions = response.get('Functions', [])
for function in functions:
function_name = function['FunctionName']
try:
policy_response = lambda_client.get_policy(FunctionName=function_name)
policy = json.loads(policy_response['Policy'])
policies[function_name] = policy
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'ResourceNotFoundException':
policies[function_name] = {}
else:
print(f"Lambda関数 '{function_name}' のポリシー取得中にエラーが発生しました: {e}")
policies[function_name] = {}
except Exception as e:
print(f"Lambda関数 '{function_name}' のポリシー取得中に予期しないエラーが発生しました: {e}")
policies[function_name] = {}
except ClientError as e:
print(f"Error listing Lambda functions: {e}")
return policies
def check_principal_arns(policy_dict, resource_type):
problematic_resources = []
for resource, policy in policy_dict.items():
if 'Statement' in policy:
statements = policy['Statement']
if not isinstance(statements, list):
statements = [statements]
for stmt in statements:
principals = []
if 'Principal' in stmt:
principal = stmt['Principal']
if isinstance(principal, dict):
for key, value in principal.items():
if isinstance(value, str):
principals.append(value)
elif isinstance(value, list):
principals.extend(value)
elif isinstance(principal, str):
principals.append(principal)
for principal_arn in principals:
if isinstance(principal_arn, str) and (principal_arn.startswith('AROA') or principal_arn.startswith('AIDA')):
# 管理コンソールURLの生成
if resource_type == 's3':
url = f"https://s3.console.aws.amazon.com/s3/buckets/{resource}/permissions?tab=permissions"
elif resource_type == 'sns':
url = f"https://console.aws.amazon.com/sns/v3/home?region={AWS_REGION}#/topics/{resource}/permissions"
elif resource_type == 'sqs':
url = f"https://console.aws.amazon.com/sqs/v2/home?region={AWS_REGION}#/queue/{resource}/permissions"
elif resource_type == 'lambda':
url = f"https://console.aws.amazon.com/lambda/home?region={AWS_REGION}#/functions/{resource}/permissions"
else:
url = ""
problematic_resources.append({
'ResourceType': resource_type.upper(),
'Resource': resource,
'PrincipalARN': principal_arn,
'Policy': policy,
'URL': url
})
return problematic_resources
def lambda_handler(event, context):
all_problems = []
# 監視対象のバケットを取得
if BUCKETS_TO_MONITOR and BUCKETS_TO_MONITOR != ['']:
s3_buckets = BUCKETS_TO_MONITOR
else:
s3_buckets = get_s3_buckets()
# S3ポリシー取得とチェック
s3_policies = get_s3_bucket_policies(s3_buckets)
s3_problems = check_principal_arns(s3_policies, 's3')
all_problems.extend(s3_problems)
# SNSポリシー取得とチェック
sns_policies = get_sns_policies()
sns_problems = check_principal_arns(sns_policies, 'sns')
all_problems.extend(sns_problems)
# SQSポリシー取得とチェック
sqs_policies = get_sqs_policies()
sqs_problems = check_principal_arns(sqs_policies, 'sqs')
all_problems.extend(sqs_problems)
# Lambdaポリシー取得とチェック
lambda_policies = get_lambda_policies()
lambda_problems = check_principal_arns(lambda_policies, 'lambda')
all_problems.extend(lambda_problems)
if all_problems:
message = "以下のリソースでプリンシパルに'AROA'または'AIDA'で始まるARNが検出されました。\n\n"
for problem in all_problems:
message += (
f"リソースタイプ: {problem['ResourceType']}\n"
f"リソース名: {problem['Resource']}\n"
f"プリンシパルARN: {problem['PrincipalARN']}\n"
f"ポリシー: {json.dumps(problem['Policy'], indent=2)}\n"
f"管理コンソールURL: {problem['URL']}\n\n"
)
# SNSで通知
try:
sns.publish(
TopicArn=SNS_TOPIC_ARN,
Subject='リソースベースポリシーのプリンシパルにAROA/AIDAが検出されました',
Message=message
)
print(f"{len(all_problems)} 件の問題を検出し、通知を送信しました。")
except ClientError as e:
print(f"SNSへの通知送信中にエラーが発生しました: {e}")
return {
'statusCode': 200,
'body': json.dumps(f"{len(all_problems)} 件の問題を検出し、通知を送信しました。")
}
else:
print("問題は検出されませんでした。")
return {
'statusCode': 200,
'body': json.dumps('問題は検出されませんでした。')
}
上記コードをデプロイしたら、テストしてみましょう。
テストタブのイベント JSON に {}
と記述しテストを実行してみてください。
EventBridge ルールを作成
EventBridge ルールを作成しましょう。
Lambda の「トリガーを追加」から EventBridge を選択し、下記ドキュメントを参考にお好みのスケジュールを作成します。
今回は下記のように設定しました。
毎週水曜日の 8:30(UTC) にイベントスケジュールを作成しています。
cron(30 8 ? * WED *)
検証用のバケットと IAM ユーザーおよび IAM ロール作成
最後に、検証用のバケットと IAM ユーザーおよび IAM ロールを作成しましょう。
それぞれ以下のような名称で作成します。
S3 バケット:testdeleteprincipal
IAM ユーザー:TestDeletePrincipalIAMUser
IAM ロール:TestDeletePrincipalIAMRole
そして、バケットポリシーを以下のように設定しましょう。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowTestUserAccess",
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::xxxxxxxxxxxx:user/TestDeletePrincipalIAMUser",
"arn:aws:iam::xxxxxxxxxxxx:role/TestDeletePrincipalIAMRole"
]
},
"Action": "s3:*",
"Resource": "arn:aws:s3:::testdeleteprincipal/*"
}
]
}
バケットポリシーを設定後、IAM ユーザーあるいは IAM ロールを削除しておきます。
今回はIAM ユーザー TestDeletePrincipalIAMUser
を削除しておきました。
検証結果
設定お疲れ様でした!
さて、実際にスケジュールした時刻を待って(あるいは Lambda のテスト実行)により、関数を動かしてみましょう。
すると以下のようなメールが指定したメールアドレスに届きます。
実際に当該バケット testdeleteprincipal
のバケットポリシーをのぞいてみると以下のようになっていました。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowTestUserAccess",
"Effect": "Allow",
"Principal": {
"AWS": [
"AIDAxxxxxxxxxxxxxxxxx",
"arn:aws:iam::xxxxxxxxxxxx:role/TestDeletePrincipalIAMRole"
]
},
"Action": "s3:*",
"Resource": "arn:aws:s3:::testdeleteprincipal/*"
}
]
}
通知メール同様、削除された IAM ユーザーを指定していたプリンシパルが AIDA から始まる文字列に置き換わっていますね!
まとめ
お疲れ様でした!長くなりましたが、今回の検証は終わりです!
無効となり、自動的に置き換わったプリンシパルを検知およびメール通知させるにはなかなか手間がかかりましたね。プリンシパルが置き換わってしまうのは AWS 側の変更なので CloudTrail には記録されず検知するには少し工夫が必要ですね。
ただ、大量のリソースの中から人力で探し出すよりは遥かに楽になるかと思います。
気になった方は試してみると良いかもしれません!
参考文献
- EメールでAmazon SNSからのメッセージを受信する | DevelopersIO
- IAM リソースベースのポリシーフォーマットを理解する | AWS re:Post
- cron 式と rate 式を使用して Amazon のルールをスケジュールする EventBridge - Amazon EventBridge
アノテーション株式会社
アノテーション株式会社は、クラスメソッド社のグループ企業として「オペレーション・エクセレンス」を担える企業を目指してチャレンジを続けています。「らしく働く、らしく生きる」のスローガンを掲げ、様々な背景をもつ多様なメンバーが自由度の高い働き方を通してお客様へサービスを提供し続けてきました。現在当社では一緒に会社を盛り上げていただけるメンバーを募集中です。少しでもご興味あれば、アノテーション株式会社WEBサイトをご覧ください。